32f26642885fe9975103dc95321bb83c36273934,src/main/java/com/continuuity/data/operation/ttqueue/TTQueueNewOnVCTable.java,FifoDequeueStrategy,fetchNextEntries,#QueueConsumer#QueueConfig#QueueStateImpl#ReadPointer#,790
Before Change
// TODO: use raw Get instead of the workaround of incrementing zero
// TODO: move counters into oracle
long groupReadPointetr = table.incrementAtomicDirtily(makeRowKey(GROUP_READ_POINTER, consumer.getGroupId()), GROUP_READ_POINTER, 0);
if(groupReadPointetr + config.getBatchSize() >= queueState.getQueueWritePointer()) {
// Reached the end of queue as per cached QueueWritePointer,
// read it again to see if there is any progress made by producers
// TODO: use raw Get instead of the workaround of incrementing zero
After Change
}
@Override
public List<Long> fetchNextEntries(QueueConsumer consumer, QueueConfig config, QueueStateImpl queueState, ReadPointer readPointer) throws OperationException {
List<Long> newEntryIds = new ArrayList<Long>();
// If claimed entries exist, return them
long claimedEntryIdBegin = queueState.getClaimedEntryBegin();
long claimedEntryIdEnd = queueState.getClaimedEntryEnd();
if(claimedEntryIdBegin != INVALID_ENTRY_ID && claimedEntryIdEnd != INVALID_ENTRY_ID &&
claimedEntryIdEnd >= claimedEntryIdBegin) {
for(long i = claimedEntryIdBegin; i <= claimedEntryIdEnd; ++i) {
newEntryIds.add(i);
}
return newEntryIds;
}
final long batchSize = getBatchSize(config);
// Else claim new queue entries to process
QueuePartitioner partitioner=config.getPartitionerType().getPartitioner();